/*
 * Copyright (c) 2021 The red-star Project
 *
 * Licensed under the Apache License, version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at:
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.inyourcode.core.transport.session;

import com.inyourcode.core.cluster.ClusterNodeManager;
import com.inyourcode.core.util.StackTraceUtil;
import com.inyourcode.core.util.Strings;
import com.inyourcode.core.threads.api.ExecutorFactory;
import com.inyourcode.core.threads.ExecutorFactoryManager;
import com.inyourcode.core.threads.api.HashExecutor;
import com.inyourcode.core.threads.api.ScheduleExecutor;
import com.inyourcode.core.transport.api.channel.JChannel;
import com.inyourcode.core.transport.api.processor.ConsumerProcessor;
import com.inyourcode.core.transport.netty.channel.NettyChannel;
import com.inyourcode.core.transport.session.api.AsyncRequest;
import com.inyourcode.core.transport.session.api.FixedRequest;
import com.inyourcode.core.transport.session.api.ISessionInterceptor;
import com.inyourcode.core.transport.session.api.MessageHolder;
import com.inyourcode.core.transport.session.api.Session;
import io.netty.channel.ChannelHandlerContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.lang.reflect.InvocationTargetException;

/**
 * <ul>
 *     消费者抽象的实现
 *     <li>将字节消息转成java对象，然后分发到模块</li>
 *     <li>默认是在netty io线程处理</li>
 *     <li>通过使用注解{@link AsyncRequest} 与 {@link FixedRequest}，可以让将消息投递到指定的线程池</li>
 * </ul>
 *
 * @author JackLei
 **/
public class BasicConsumerProcessor implements ConsumerProcessor<ResponseContext> {
    protected static final Logger LOGGER = LoggerFactory.getLogger(BasicConsumerProcessor.class);
    private ISessionInterceptor interceptor;
    private HashExecutor hashExecutor;
    private ScheduleExecutor scheduleExecutor;

    public BasicConsumerProcessor() {
        this.interceptor = ISessionInterceptor.DEFALUT;
        this.hashExecutor = ExecutorFactoryManager.newExecutor(ExecutorFactory.Target.HASH);
        this.scheduleExecutor = ExecutorFactoryManager.newExecutor(ExecutorFactory.Target.SCHEDULE);
    }

    @Override
    public void handleResponse(JChannel channel, ResponseContext responseContext) throws Exception {
        long invokeId = responseContext.getInvokeId();
        byte serializerCode = responseContext.getSerializerCode();
        Session session = SessionService.getSessionByChannel((NettyChannel) channel);
        if (session.getExecutor() == null) {
            session.setExecutor(hashExecutor);
        }
        HandlerWrapper wrapper = SessionHandlerMapping.getHandlerWrapper(invokeId);
        if(LOGGER.isInfoEnabled()) {
            LOGGER.info("receive request message from server, invokeId: {}, traceId: {}, body: {}",
                    invokeId, responseContext.getTraceId(), Strings.netMsgToJSON(serializerCode, responseContext.getData()));
        }

        SessionConnectorTask messageTask= new SessionConnectorTask(session, wrapper, responseContext);
        dispatch(messageTask);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        if (ClusterNodeManager.getInstance() != null) {
            ClusterNodeManager.getInstance().incNodeLoad();
        }
        Session session = SessionService.create(ctx.channel());
        SessionListenerService.listenOpen(session);
    }

    @Override
    public void channelInactive(ChannelHandlerContext ctx) throws Exception {
        if (ClusterNodeManager.getInstance() != null) {
            ClusterNodeManager.getInstance().deIncNodeLoad();
        }
        Session remove = SessionService.remove(ctx.channel());
        SessionListenerService.listenClose(remove);
        //TODO 异常断开资源释放
    }

    protected void dispatch(SessionConnectorTask task) {
        try {
            HandlerWrapper wrapper = task.wrapper;
            Object message = task.requestContext.getData();
            Session session = task.session;
            if (!this.interceptor.intercept(wrapper, message, session)) {
                return;
            }

            if (wrapper.isNoAuthReq()) {
                this.scheduleExecutor.execute(task);
                return;
            }

            if (!session.isAuth()) {
                //TODO close连接
                LOGGER.error("session is forbid, channel = {}", session.channel());
                return;
            }

            if (wrapper.isAsyncReq()) {
                this.scheduleExecutor.execute(task);
                return;
            }

            if (wrapper.isFixedReq()) {
                this.hashExecutor.execute(session.hashCode(), task);
                return;
            }

            hashExecutor.execute(session.hashCode(), task);
        } catch (Exception ex) {
            LOGGER.error("network request processing failed,{}", StackTraceUtil.stackTrace(ex));
        }
    }

    class SessionConnectorTask implements Runnable {
        private Session session;
        private HandlerWrapper wrapper;
        private MessageHolder requestContext;

        public SessionConnectorTask(Session session, HandlerWrapper wrapper, MessageHolder requestContext) {
            this.session = session;
            this.wrapper = wrapper;
            this.requestContext = requestContext;
        }

        @Override
        public void run() {
            try {
                wrapper.invoke(session, requestContext);
            } catch (InvocationTargetException e) {
                LOGGER.error("session message invoke failed, id: {}, traceId: {}, exception: {}", session.getId(), requestContext.getTraceId(), StackTraceUtil.stackTrace(e));
            } catch (IllegalAccessException e) {
                LOGGER.error("session message invoke failed, id: {}, traceId: {}, exception: {}", session.getId(), requestContext.getTraceId(), StackTraceUtil.stackTrace(e));
            }
        }
    }

}
